-
Notifications
You must be signed in to change notification settings - Fork 277
feat: Various improvements to memory pool configuration, logging, and documentation #2538
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2538 +/- ##
============================================
+ Coverage 56.12% 59.17% +3.04%
- Complexity 976 1458 +482
============================================
Files 119 146 +27
Lines 11743 13685 +1942
Branches 2251 2363 +112
============================================
+ Hits 6591 8098 +1507
- Misses 4012 4360 +348
- Partials 1140 1227 +87 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| // The allocator thoughts the exported ArrowArray and ArrowSchema structs are not released, | ||
| // so it will report: | ||
| // Caused by: java.lang.IllegalStateException: Memory was leaked by query. | ||
| // Memory leaked: (516) Allocator(ROOT) 0/516/808/9223372036854775807 (res/actual/peak/limit) | ||
| // Suspect this seems a false positive leak, because there is no reported memory leak at JVM | ||
| // when profiling. `allocator` reports a leak because it calculates the accumulated number | ||
| // of memory allocated for ArrowArray and ArrowSchema. But these exported ones will be | ||
| // released in native side later. | ||
| // More to clarify it. For ArrowArray and ArrowSchema, Arrow will put a release field into the | ||
| // memory region which is a callback function pointer (C function) that could be called to | ||
| // release these structs in native code too. Once we wrap their memory addresses at native | ||
| // side using FFI ArrowArray and ArrowSchema, and drop them later, the callback function will | ||
| // be called to release the memory. | ||
| // But at JVM, the allocator doesn't know about this fact so it still keeps the accumulated | ||
| // number. | ||
| // Tried to manually do `release` and `close` that can make the allocator happy, but it will | ||
| // cause JVM runtime failure. | ||
|
|
||
| // allocator.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this comment since it refers to an allocator that no longer exists in this code.
| "Only applies to off-heap mode. " + | ||
| s"$TUNING_GUIDE.") | ||
| .doubleConf | ||
| .createWithDefault(1.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default is 1.0 so that this change is not a breaking change
| object CometExecIterator extends Logging { | ||
|
|
||
| def getMemoryConfig(conf: SparkConf): MemoryConfig = { | ||
| val numCores = numDriverOrExecutorCores(conf).toFloat |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can number of cores be fractional? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is an intConf. I updated this.
|
|
||
| def getMemoryConfig(conf: SparkConf): MemoryConfig = { | ||
| val numCores = numDriverOrExecutorCores(conf).toFloat | ||
| val coresPerTask = conf.get("spark.task.cpus", "1").toFloat |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is an intConf. I updated this.
| if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt | ||
| } | ||
|
|
||
| val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice to comment what expression is looking for like local[*] pseudocode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some comments
|
@parthchandra @comphead @mbutrovich This is now ready for review. There have been some changes in scope today, so please re-read the PR description. |
| "The type of memory pool to be used for Comet native execution. " + | ||
| "When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', " + | ||
| "The type of memory pool to be used for Comet native execution " + | ||
| "hen running Spark in on-heap mode. Available pool types are 'greedy', 'fair_spill', " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nitpick : Guess hen is a typo for when ? (probably no caps in available too ?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks. 🐔
coderfender
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor typo
| .doc( | ||
| "The type of memory pool to be used for Comet native execution " + | ||
| "when running Spark in off-heap mode. Available pool types are 'greedy', 'fair_spill', " + | ||
| "'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we not limiting the available memory pools?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should, but as a separate PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops .. this was actually a copy-paste error - these are the on-heap pools. Updated.
|
Thanks @andygrove I think the PR is good, its waiting for |
| ``` | ||
|
|
||
| When running in on-heap mode, Comet will use its own dedicated memory pools that are not shared with Spark. | ||
| `fair_unified_global` allows any task to use the full off-heap memory pool. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is fair_unified_global used? I don’t seem to find it in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I have updated this.
| val offHeapSize = ByteUnit.MiB.toBytes(conf.getSizeAsMb("spark.memory.offHeap.size")) | ||
| val memoryFraction = CometConf.COMET_EXEC_MEMORY_POOL_FRACTION.get() | ||
| val memoryLimit = (offHeapSize * memoryFraction).toLong | ||
| val memoryLimitPerTask = (memoryLimit.toFloat * coresPerTask / numCores).toLong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to use toDouble instead of toFloat here. I'm not super worried about rounding errors or overflow here, but better safe than sorry and we won't see a performance difference.
| val memoryLimit = CometSparkSessionExtensions.getCometMemoryOverhead(conf) | ||
| // example 16GB maxMemory * 16 cores with 4 cores per task results | ||
| // in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB | ||
| val memoryLimitPerTask = (memoryLimit.toFloat * coresPerTask / numCores).toLong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment about toDouble.
mbutrovich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a huge improvement in configuring Comet! Thanks @andygrove!
| Comet Performance | ||
|
|
||
| - Comet requires at least 5 GB of RAM in off-heap mode and 6 GB RAM in on-heap mode, but performance at this level | ||
| - Comet requires at least 5 GB of RAM, but performance at this level |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it onheap or offheap?
comphead
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @andygrove
Which issue does this PR close?
N/A
Rationale for this change
Now that Comet requires off-heap mode (except for when we are running tests), remove all of the confusing documentation about configuring Comet on-heap memory pools.
Also, add a new config for controlling what percentage of the off-heap memory pool can be used by Comet (required because Comet memory accounting is not accurate).
With this PR, there are now only two user-facing memory pool configs:
spark.comet.exec.memoryPoolspark.comet.exec.memoryPool.fractionWhat changes are included in this PR?
COMET_EXEC_MEMORY_POOL_FRACTIONfor limiting % of off-heap pool that Comet can use (required because Comet memory accounting is not accurate)How are these changes tested?